package com.shujia.flink.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo3KafkaSInk {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> linesDS = env.readTextFile("flink/data/students.csv");

        //将结果保存到kafka中
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers("master:9092,node1:9092,node2:9092")//broker列表
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("students")//指定topic
                        .setValueSerializationSchema(new SimpleStringSchema())//指定数据格式
                        .build()
                )
                //指定数据处理的语义AT_LEAST_ONCE：至少一次，EXACTLY_ONCE：唯一一次
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        //使用kafka sink
        linesDS.sinkTo(sink);

        env.execute();

    }
}
